<?php

namespace CrontabWorker;


use \CrontabWorker\MsgQueue;

class CrontabWorker {

    private $_running = true;
    // 进程
    static public $workers = null;

    // 任务
    static public $tasks = array();

    // 消息队列
    static public $queue = null;

    // config目录下的所有配置
    static public $config = array();

    // 主进程ID
    private $pidFile;

    // 记录任务状态的文件
    private $taskStatusFile;

    //进程名称
    public $title = 'php-crontab';

    public $output = '/dev/null';

    // 启用worker数
    public $proccessNum = 5;

    public $daemon = false;

    // 进程运行多少次自动回收
    public $proccessRunNum = 10000;

    public function __construct()
    {
        date_default_timezone_set("Asia/Shanghai");

        $this->pidFile = TMP_PATH . basename($_SERVER['PHP_SELF']) . '.pid';

        $this->taskStatusFile = TMP_PATH . basename($_SERVER['PHP_SELF']) . '.task';
    }

    /**
     * 检查环境
     */
    public function checkEnv()
    {
        if (version_compare("5.3", PHP_VERSION, ">")) {
            die("PHP版本至少要大于5.3\n");
        }
        if (!function_exists('pcntl_fork')) {
            die('需要安装PHP的pcntl扩展\n');
        }
        if (!function_exists('pcntl_signal_dispatch')) {
            declare(ticks = 1);
        }
        if (!is_dir(TMP_PATH)) {
            @mkdir(TMP_PATH, 0755, true);
        } 
    }

    public function run()
    {

        $this->checkEnv();

        // 接收命令
        if (false === $this->runCommand()) {
            return ;
        }

        if ($this->daemon) {
            $this->daemonize();
        }

        $this->initConfig();

        $this->installSignalHandler();

        self::$queue = MsgQueue::create(ftok(__FILE__, 'a'));

        while ($this->_running) {
            pcntl_alarm(1);
            if (function_exists('pcntl_signal_dispatch')) {
                pcntl_signal_dispatch();
            }
            sleep(1);
        }

        $this->close();
        
    }

    public function close()
    {
        
        if (!is_null(self::$queue)) {
            self::$queue->clear();
        }

        if (is_file($this->pidFile)) {
            @unlink($this->pidFile);
        }

        if (is_file($this->taskStatusFile)) {
            @unlink($this->taskStatusFile);
        }

        posix_kill(0, SIGKILL);
    }

    public function runCommand()
    {
        global $argv;


        // start
        if (count($argv) < 2) {
            return true;
        }

        $command = strtolower($argv[1]);

        if ($command == 'start') {
            return true;
        }

        if ($command == 'stop') {
            if (is_file($this->pidFile)) {
                $pid = file_get_contents($this->pidFile);
                posix_kill($pid, SIGINT);
            }
            return false;
        }

        if ($command == 'status') {
            $this->showStatus();
            return false;
        }


        return false;
    }

    /**
     * 守护进程化
     */
    public function daemonize()
    {
        global $stdin, $stdout, $stderr;

        // 只允许在cli下面运行
        if (php_sapi_name() != "cli"){
            die("only run in command line mode\n");
        }

        if (is_file($this->pidFile)) {
            die("faild. server is running! plz stop\n");
        }

        $pid = pcntl_fork();
        if ($pid == -1)
        {
            die("fork(1) failed!\n");
        }
        elseif ($pid > 0)
        {
            //让由用户启动的进程退出
            exit(0);
        }
        //建立一个有别于终端的新session以脱离终端
        posix_setsid();

        $pid = pcntl_fork();
        if ($pid == -1)
        {
            die("fork(2) failed!\n");
        }
        elseif ($pid > 0)
        {
            //让由用户启动的进程退出
            exit(0);
        }

        //关闭打开的文件描述符
        // fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);

        $stdin  = fopen($this->output, 'r');
        $stdout = fopen($this->output, 'a');
        $stderr = fopen($this->output, 'a');
        // 设置当前进程的标题 mac下不生效
        @cli_set_process_title($this->title);

        if ($this->output != '/dev/null' && !is_file($this->output)) {
            @touch($this->output);
            chmod($this->output, 0666);
        }

        $this->createPidFile();
    }

    /**
     * 向控制台输出任务状态
     */
    public function showStatus()
    {
        if (!is_file($this->pidFile)) {
            die("任务未运行\n");
        }


        $proccessInfo = str_pad('=', 70,'=') . "\n";
        $proccessInfo .= "PPID: ".file_get_contents($this->pidFile)."\n";
        $proccessInfo .= "RUN_TIME: " .date('Y-m-d H:i',filemtime($this->pidFile))."\n";
        $proccessInfo .= "PROCCESS_TITLE: ".$this->title."\n";
        $proccessInfo .= "PROCCESS: " .$this->proccessNum."\n";
        $proccessInfo .= str_pad('=', 70,'=') . "\n";

        $status = json_decode(file_get_contents($this->taskStatusFile),true);

        $crontabTitle = str_pad('NAME', 15) . str_pad('COMMAND', 10) . str_pad('COUNT', 10) . str_pad('LAST_TIME', 20) . str_pad('NEXT_TIME', 15) . "\n";

        $body = '';
        foreach ($status as $v) {
            $chineseStr = preg_replace('/[^\x{4e00}-\x{9fa5}]/u', '', $v['name']);

            $v['lastTime'] = isset($v['lastTime']) ? date('m-d H:i:s',$v['lastTime']) : 'none';

            $body .= (str_pad($v['name'], 15 + mb_strlen($chineseStr))
                                . str_pad($v['command'], 10)
                                . str_pad($v['count'], 10)
                                . str_pad($v['lastTime'], 20)
                                . str_pad(date('m-d H:i:s',$v['nextTime']), 15))
                                . "\n";
        }

        $body .= str_pad('=', 70,'=') . "\n\n";

        echo $proccessInfo . $crontabTitle . $body;

    }
    /**
     * 创建进程pid文件,用于确定任务运行状态
     */
    public function createPidFile()
    {
        file_put_contents($this->pidFile, getmypid());
    }
    public function initConfig()
    {
        $files = glob(ROOT_PATH . 'config/*.php');
        foreach ($files as $file) {
            $key = substr(basename($file), 0,-4);
            self::$config[$key] = C($key);
        }
    }

    public function installSignalHandler()
    {
        pcntl_signal(SIGINT, array(__CLASS__, 'signalHandler'),false);
        pcntl_signal(SIGQUIT, array(__CLASS__, 'signalHandler'),false);
        pcntl_signal(SIGCHLD, array(__CLASS__, "signalHandler")); 
        pcntl_signal(SIGALRM, array(__CLASS__, 'signalHandler'));
    }

    public function signalHandler($signo){
 
        switch($signo){

            // 心跳机制
            case SIGALRM:
                // 监视子进程情况
                $this->watchProccess();
                // 拉取任务
                $this->watchTask();
                pcntl_alarm(1);
                break;
            //子进程结束信号
            case SIGCHLD:
                while(($pid=pcntl_waitpid(-1, $status, WNOHANG)) > 0){
                    unset(self::$workers[$pid]);
                    echo "exit $pid\n";
                }
                break;
            //中断进程
            case SIGQUIT:
            case SIGINT:
                $this->_running = false;
                break;
            default:
                return false;
        }
 
    }

    

    /**
     * 解析设定,计算下次运行的时间
     * @param  [type]  $tag 
     * @param  [type]  $timer 
     */
    public function calcNextTime($tag, $timer)
    {
        $nextTime = false;
        // 指定每天运行日期  格式 00:00
        if ($tag == 'at' && strlen($timer) == 5) {
            if (time() >= strtotime($timer)) {
                $nextTime = strtotime($timer . " +1day");
            } 
            else {
                $nextTime = strtotime($timer);
            }
        }

        $timer = intval($timer);
        // 按秒
        if ($tag == 's' && $timer > 0) {
            $nextTime = time() + $timer;
        }

        // 按分钟
        if ($tag == 'i' && $timer > 0) {
            $nextTime = time() + $timer * 60;
        }

        // 按小时
        if ($tag == 'h' && $timer > 0) {
            $nextTime = time() + $timer * 60 * 60;
        }

        return $nextTime;
    }


    /**
     * 添加定时任务
     * @param $name 任务名称
     * @param $tags 指令[at:指定每天运行时间|h:每h小时|i:每i分钟|s:每s秒]
     * @param $timer 
     * @param $settings
     */
    public function addInterval($name, $command, $callable, $args=array())
    {
        $exp_command = strtolower($command);
        list($tag, $timer) = explode('@', $exp_command);
        if (!in_array($tag, array('at','s','i','h'))) {
            return false;
        }
        // 函数或静态方法
        if (!is_callable($callable)) {
            return false;
        }

        $task['name'] = $name;
        $task['count'] = 0;
        $task['callable'] = $callable;
        $task['args'] = $args;
        $task['tag'] = $tag;
        $task['timer'] = $timer;
        $task['command'] = $command;

        self::$tasks[] = $task;
    }

    

    /**
     * 创建子进程
     */
    public function openProccess()
    {
        $pid = pcntl_fork();
        if ($pid > 0) {
            self::$workers[$pid] = $pid;
        } elseif ($pid == 0) {
            pcntl_signal(SIGINT, SIG_DFL);
            pcntl_signal(SIGCHLD, SIG_DFL);
            $this->taskPop();
            exit();
        }
    }

    /**
     * 监视进程情况
     */
    public function watchProccess()
    {
        // 创建进程
        while ($this->proccessNum > count(self::$workers)) {
            $this->openProccess();
        }

        // 读取进程信息,写入文件
        
    }

    /**
     * 解析任务发送到消息队列里,分发给子进程
     */
    public function watchTask()
    {
        foreach (self::$tasks as &$task) {

            if (empty($task['nextTime'])) {
                $task['nextTime'] = $this->calcNextTime($task['tag'], $task['timer']);
            }

            if (!empty($task['nextTime']) && time() >= $task['nextTime']) {
                self::$queue->push(json_encode($task));
                $task['count']++;
                $task['lastTime'] = $task['nextTime'];
                $task['nextTime'] = $this->calcNextTime($task['tag'], $task['timer']);
            }

        }

        // 读取任务状态,写入文件
        file_put_contents($this->taskStatusFile, json_encode(self::$tasks));
    }

    /**
     * 处理消息队列里的任务
     */
    public function taskPop()
    {
        $count = 0;
        while ($this->proccessRunNum != $count) {
            if (($msg = self::$queue->pop()) !== false) {
                $task = json_decode($msg, true);
                call_user_func_array($task['callable'], $task['args']);
                $count++;
            }
        }
    }
}
